Queue หรือ แถวคอย มีลักษณะการทำงานเหมือน FIFO (First-In-First-Out) เป็นโครงสร้างข้อมูลพื้นฐาน หรืออาจมองว่าเป็น คลาส (Class) หรือ ออปเจกต์ (Object) ก็ได้ คือ มีข้อมูลเก็บไว้ภายในตามความจุที่กำหนดไว้ (อย่างน้อย 1) และมีคำสั่งหรือวิธีการนำข้อมูลไปใส่ไว้ข้างใน หรือดึงข้อมูลออกมาตามลำดับ (อาจจะเรียกว่า Put หรือ Send สำหรับการนำข้อมูลไปใส่ และ Get หรือ Receive สำหรับการนำข้อมูลออกมา ตามลำดับ) ข้อมูลที่ถูกนำไปใส่ก่อนจะต้องเป็นข้อมูลที่ถูกนำออกมาก่อน
ถ้าเราใช้ FreeRTOS ก็สามารถเลือกใช้คำสั่งของ Queue API ได้ และเป็นวิธีการหนึ่งที่สามารถใช้สื่อสารข้อมูลและรอจังหวะหรือประสานการทำงานระหว่างทาส์กได้ สมมุติว่า เริ่มต้นยังไม่มีข้อมูลใด ๆ ถ้านำข้อมูลไปใส่ ก็สามารถทำได้เลย ไม่ต้องรอ แต่ถ้าใส่ข้อมูลตามจำนวนของความจุแล้ว (Full) จะใส่ข้อมูลถัดไป ต้องรอ (Waiting) หรือถูกหยุดไว้ชั่วคราว (Blocked) ในทางตรงกันข้าม ถ้าไม่มีข้อมูลใด ๆ เลยในแถวคอย (Empty) และต้องการดึงข้อมูลออกมา จะไม่สามารถทำได้ ดังนั้นจึงต้องรอ หรือถูกหยุดไว้ชั่วคราว
ถ้าเราแบ่งการทำงานเป็นสองฝ่ายคือ ฝ่ายที่นำข้อมูลไปใส่ กับฝ่ายที่นำข้อมูลออกมา การทำงานในลักษณะนี้ก็เรียกว่า Producer และ Consumer ทั้งสองฝ่ายสามารถทำงานอยู่ในรูปของทาส์ก และสื่อสารข้อมูลกันโดยใช้ Queue
ถ้ากำหนดให้มีความจุเท่ากับ 1 การทำงานของ Producer และ Consumer จะเป็นดังนี้ ข้อมูลที่นำไปใส่ใน Queue จะเปรียบเสมือนโทเคน (Token) และโทเคนอาจเป็นข้อมูลที่ถูกส่งต่อกันระหว่างทาส์ก เช่น ค่าจากการอ่านเซนเซอร์ เป็นต้น
ถ้ายังว่างอยู่ Producer จะสามารถนำโทเคนไปใส่ได้ แต่ถ้ามีโทเคนอยู่แล้ว Producer จะต้องรอก่อน และการทำงานในส่วนของ Consumer ก็จะต้องพยายามดึงโทเคนออกมา แต่ถ้าในขณะนั้นไม่มีโทเคนเหลืออยู่ ก็จะต้องรอจนกว่า โทเคนถัดไปถูกใส่เข้ามาโดย Producer
โค้ดตัวอย่าง: Single Producer & Single Consumer
เราเริ่มต้นโค้ดตัวอย่างด้วยการสร้างทาส์ก Producer และ Consumer ที่มีระดับความสำคัญเท่ากัน และสร้าง Queue ที่มีความจุเท่ากับ 1 และเก็บข้อมูลแบบ uint16_t
ในตัวอย่างนี้ ทาส์ก Producer จะนำข้อมูลที่เป็นค่าของตัวนับขึ้นทีละหนึ่ง เพื่อนำไปใส่ใน Queue และมีการเว้นระยะเวลาโดยการสุ่มค่า (อยู่ในช่วงระยะเวลาประมาณ 100 ถึง 1000 มิลลิวินาที) ในขณะที่ทาส์ก Consumer จะคอยอ่านข้อมูลออกมา แล้วส่งออกทาง Serial ไปยังคอมพิวเตอร์ เป็นข้อความเพื่อนำไปแสดงผลต่อไป
#include <Arduino_FreeRTOS.h>
#include <task.h>
#include <queue.h>#define QUEUE_CAPACITY 1// global variables
QueueHandle_t queue;// function prototypes for task functions
void taskProducer( void *pvParameters );
void taskConsumer( void *pvParameters );void setup() {
Serial.begin( 115200 );
randomSeed( analogRead(0) );
// create a queue for storing 16-bit unsigned integer(s)
queue = xQueueCreate( QUEUE_CAPACITY, sizeof(uint16_t) ); // create the producer task
int priority = (tskIDLE_PRIORITY + 1);
xTaskCreate( taskProducer, "P", 128,
(void*)0, priority, NULL
); // create the consumer task
xTaskCreate( taskConsumer, "C", 128,
(void*)1, priority, NULL
);
// Note the task scheduler is started automatically.
}void loop() {}void taskProducer( void *pvParameters ) { // task function
int id = (int)pvParameters;
uint16_t wdata = 0;
while(1) {
vTaskDelay( pdMS_TO_TICKS( random(100,1000) ) );
if (xQueueSend(queue, &wdata, portMAX_DELAY)==pdTRUE){
wdata = (wdata+1) % 10000;
}
}
}void taskConsumer( void *pvParameters ) { // task function
int id = (int)pvParameters;
uint16_t rdata;
char sbuf[32];
vTaskDelay( pdMS_TO_TICKS( random(1000,2000) ) );
while(1) {
if (xQueueReceive(queue, &rdata, portMAX_DELAY)==pdTRUE){
sprintf( sbuf, "received: [%04u]", rdata );
Serial.println( sbuf );
}
}
}
ถ้าทดสอบการทำงานของโค้ดตัวอย่างนี้ เช่น ใช้บอร์ด Arduino Uno หรือ Nano จะได้รับข้อความทาง Serial Monitor และจะเห็นได้ว่า ข้อมูลตัวเลขซึ่งเป็นค่าของตัวนับนั้น จะเรียงกันไปตามลำดับ และไม่มีข้อมูลสูญหายหรือถูกข้ามไป
โค้ดตัวอย่าง: Single Producer & Multiple Consumers
ถัดไปลองเขียนโค้ดใหม่ โดยเพิ่มจำนวนทาส์กที่เป็น Consumer แต่ยังให้จำนวนของทาส์กที่เป็น Producer มีเพียงหนึ่งเดียวเท่านั้น และใช้ Queue ร่วมกันทั้งหมด แต่เพิ่มความจุ (Queue Capacity) ให้มากกว่า 1 เช่น เพิ่มขึ้นเป็น 4
#include <Arduino_FreeRTOS.h>
#include <task.h>
#include <queue.h>
#include <semphr.h>#define QUEUE_CAPACITY 4
#define NUM_CONSUMERS 4// global variables
QueueHandle_t queue;
SemaphoreHandle_t serial_mutex;
char sbuf[32]; // used for sprintf()// function prototypes for task functions
void taskProducer( void *pvParameters );
void taskConsumer( void *pvParameters );void setup() {
Serial.begin( 115200 );
randomSeed( analogRead(0) );
// create a mutex
serial_mutex = xSemaphoreCreateMutex( );
// create a queue for storing 16-bit unsigned integer(s)
queue = xQueueCreate( QUEUE_CAPACITY, sizeof(uint16_t) ); char sbuf[32];
sprintf( sbuf, "capacity: %d",
uxQueueSpacesAvailable(queue) );
Serial.println( sbuf ); int priority = (tskIDLE_PRIORITY + 1);
// create a producer task
xTaskCreate( taskProducer, "P", 100,
(void*)0, priority, NULL
);
// create consumer tasks
char name[4];
for ( int i=1; i <= NUM_CONSUMERS; i++ ) {
sprintf( name, "C%d", i );
xTaskCreate( taskConsumer, name, 100,
(void*)(i), priority, NULL
);
}
// Note the task scheduler is started automatically.
}void loop() {}void taskProducer( void *pvParameters ) { // task function
int id = (int)pvParameters;
uint16_t wdata = 0;
while(1) {
if (xQueueSend(queue, &wdata, portMAX_DELAY)==pdTRUE){
wdata = (wdata+1) % 10000;
}
}
}void taskConsumer( void *pvParameters ) { // task function
int id = (int)pvParameters;
uint16_t rdata;
while(1) {
vTaskDelay( pdMS_TO_TICKS( random(200,500) ) );
if (xQueueReceive(queue, &rdata, portMAX_DELAY)==pdTRUE){
xSemaphoreTake( serial_mutex, portMAX_DELAY );
sprintf( sbuf, "received [C%d]: [%04u]", id, rdata );
Serial.println( sbuf );
xSemaphoreGive( serial_mutex );
}
}
}
ทาสก์ Consumer ที่สามารถอ่านข้อมูลจาก Queue ได้ จะส่งข้อความออกทาง Serial แสดงข้อมูลที่อ่านได้โดยทาส์กใด โดยมีการเข้าถึงและใช้ Serial ร่วมกัน โดยใช้ Mutex เป็นตัวจัดการ
โค้ดตัวอย่าง: Multi-Channel Analog Reading
ถัดไปเป็นตัวอย่างการให้ทาส์ก Producer อ่านค่าอินพุตจากขาแอนะล็อก เช่น A0 ถึง A4 ไปตามลำดับ เก็บค่าที่อ่านได้ในแต่ละช่องเป็นแบบ uint16_t
แล้วส่งเป็นข้อมูลพร้อมระบุหมายเลขช่องและลำดับของข้อมูลตามโครงสร้าง data_t
ที่ได้ประกาศใช้งาน
ในส่วนการทำงานของทาส์ก Consumer ก็รับค่าจาก Queue ที่มีความจุเท่ากับ 4 แล้วนำไปแสดงผลเป็นข้อความส่งทาง Serial
#include <Arduino_FreeRTOS.h>
#include <task.h>
#include <queue.h>
#include <semphr.h>#define QUEUE_CAPACITY 4
#define NUM_ANALOG_CHANNELS 5typedef struct _data {
uint8_t seq_num;
uint8_t channel;
union {
uint8_t b[2];
uint16_t value;
};
} __attribute__((packed)) data_t;// global variables
QueueHandle_t queue;
char sbuf[64]; // used for sprintf()// function prototypes for task functions
void taskProducer( void *pvParameters );
void taskConsumer( void *pvParameters );void setup() {
Serial.begin( 115200 );
randomSeed( analogRead(0) );// create a queue
queue = xQueueCreate( QUEUE_CAPACITY, sizeof(data_t) );int priority = (tskIDLE_PRIORITY + 1);
// create the producer task
xTaskCreate( taskProducer, "P", 128,
(void*)0, priority, NULL
);
// create the consumer task
xTaskCreate( taskConsumer, "C", 128,
(void*)1, priority, NULL
);
// Note the task scheduler is started automatically.
}void loop() {}void taskProducer( void *pvParameters ) { // task function
int id = (int)pvParameters;
uint8_t cnt = 0;
uint16_t value;
data_t wdata;
data_t *p = &wdata;
while(1) {
for ( int i=0; i < NUM_ANALOG_CHANNELS; i++ ) {
value = analogRead( A0 + i );
p->b[0] = value & 0xff; // low byte
p->b[1] = value >> 8; // high byte
p->channel = i; // channel number
p->seq_num = cnt; // seq. number
while (xQueueSend(queue, &wdata, portMAX_DELAY)!=pdTRUE);
}
cnt++;
}
}void taskConsumer( void *pvParameters ) { // task function
int id = (int)pvParameters;
data_t rdata, *p;
p = &rdata;
while(1) {
vTaskDelay( pdMS_TO_TICKS( random(200,500) ) );
memset( p, 0, sizeof(data_t) );
if (xQueueReceive(queue, p, portMAX_DELAY)==pdTRUE){
sprintf( sbuf, "recv> CHAN=%d, SEQ_NO=%d VALUE=%d",
p->channel, p->seq_num, p->value );
Serial.println( sbuf );
}
}
}
หวังว่าตัวอย่างโค้ดในบทความนี้ จะช่วยให้ผู้อ่านได้เห็นรูปแบบการสื่อสารข้อมูลกันระหว่างทาส์กใน FreeRTOS โดยใช้ Queue และสามารถนำไปประยุกต์ใช้งานได้ต่อไปนะครับ
Creative Commons, Attribution-Non Commercial-Share Alike 4.0 International (CC BY-NC-SA 4.0)