RxJS — pipeable programming

Ittipol Thirasat
4 min readJul 28, 2020

--

เรื่องหนักใจในการทำงานอย่างนึงที่ผมเจออยู่คือการที่เราค้นพบว่า RxJS นี่ทรงพลังมากในการเอามาใช้งาน มันทำได้ทุกอย่าง ทำได้ง่าย ทำได้เร็ว และยังก่อให้เกิดปัญหาน้อยลง ผิดพลาดยากขึ้นด้วย แต่กลับเริ่มต้นได้ยากมากโดยเฉพาะในการอธิบายให้คนที่ไม่เคยใช้เข้าใจว่ามันเป็นอะไรและทำงานยังไงเนื่องจากรูปแบบการทำงานมันค่อนข้างที่จะแตกต่างไปจากการโค้ดที่นิยมกัน

บทความเกี่ยวกับ RxJS เองก็มีเยอะแล้วเพราะผมก็ไม่ใช่คนเดียวที่เจอปัญหานี้น่ะนะ คนที่ใช้แล้วดีก็เจอปัญหาบอกต่อไม่ได้เหมือนๆ กันกับผมนี่แหละ 😂 แต่บทความนี้ (น่าจะ) ต่างออกไปตรงที่ผมจะพูดถึงแนวคิดคร่าวๆ ของโครงการ RxJS แล้วก็พื้นฐานอีกนิดนึง แล้วจะลงไปที่แนวคิดการทำงานในรูปแบบ pipeable programming ที่ผมใช้เป็นหลักอยู่ตอนนี้ซึ่งเค้าไม่รู้ว่าเค้ามีชื่อเรียกกันหรือเปล่า เอาเป็นชื่อนี้ไปก่อนแล้วกันครับ
เป้าหมายของบทความนี้ผมคาดหวังว่าจะทำให้คนที่ไม่เคยใช้ RxJS เลยและคนที่เคยใช้มาบ้างแต่ยังไม่รู้ว่าใช้แล้วจะได้ประโยชน์เพิ่มขึ้นมายังไงได้มองเห็นภาพมากขึ้นนะครับ

RxJS? ReactiveX? Reactive Programming?

ตัว RxJS นี้เป็นส่วนหนึ่งของโครงการ ReactiveX ซึ่งจับเอา Observer, Iterator และ Functional programming มารวมกันเพื่อใช้ในรูปแบบของ Reactive programming ครับ

ขณะที่ Reactive programming หมายถึงการทำงานในรูปแบบที่เน้นไปทางการตอบสนองต่อสิ่งเร้าโดยที่เราเขียนปฏิกิริยาที่จะตอบสนองไว้ล่วงหน้า นั่นคือจะไม่มีการทำงานหากไม่มีอะไรเกิดขึ้นก่อน ส่วนปฏิกิริยาที่เราเขียนไว้ล่วงหน้าก็อย่างเช่นเมื่อเกิดสิ่งนี้แล้วเราจะตอบสนองแบบนั้น ซึ่งจุดหลักในการทำงานจะเป็นการระบุต้นกำเนิดเหตุการณ์ (source), ปฏิกิริยา (reaction), และการผูกปฏิกิริยาเข้ากับต้นกำเนิดเหตุการณ์

ส่วน ReactiveX เป็นโครงการที่สร้าง library ที่รวบรวมเครื่องมือเพื่อให้เราโค้ดแบบ reactive programming ได้สะดวกขึ้นสำหรับหลายๆ platform ซึ่งก็ไม่ได้มีแค่ RxJS แต่ยังมี RxGo, RxPY, RxKotlin, RxSwift และอื่นๆ อีกเต็มไปหมดรวมถึง RxCpp ที่ผมก็ยังได้เอาไปใช้งานทำ embedded system มาด้วย ซึ่งทาง ReactiveX เองก็เคลมว่าจะทำให้ codebase ของเราดีขึ้นได้ดังนี้

Better codebases, using clean input/output, reduce a challenge into a few lines of code, async error handling, concurrency
จากการที่ใช้มาเป็นปีก็คิดแบบนี้เลยครับ แต่เพิ่งเห็นรูปนี้เมื่อวานตอนมาอ่านเว็บเค้า 😅

มาถึงตัวสุดท้ายอย่าง RxJS ที่ถือเป็นหนึ่งในหัวหอกของ ReactiveX ได้รับการพัฒนาและอัปเดตเป็นแนวหน้าเลยทีเดียว โดยเฉพาะการที่ได้ pipeable operator มา นอกจากทำให้ทำ tree-shaking เพื่อให้แอปเราเร็วขึ้นและเบาลงเนื่องจากแอปจะไม่ต้องโหลดส่วนที่ไม่ได้ใช้งานมาให้หนักแล้วก็ยังทำให้เราเอามาเล่นแผลงๆ ได้ง่ายกว่ารูปแบบเดิมด้วย

Observable? Observer?

พูดถึง RxJS แล้วจะไม่พูดถึง Observable กับ Observer เลยก็คงไม่ได้

pull single value — function, pull multiple value — iterator, push single value — promise, push multiple value — observable

Observable หรือของที่สามารถไปสังเกตการณ์ได้ ตัวนี้มักถูกใช้เป็นต้นกำเนิดเหตุการณ์หรือ source ครับ เป็นสิ่งที่เราสามารถไปสังเกตการณ์ได้ว่ามันมีอะไรออกมาเมื่อไหร่ ถ้าเทียบกับตารางด้านบน observable คือสิ่งที่ทำงานในรูปแบบ push ได้หลายค่า คือมาเรียกเราเวลาที่มีค่าใหม่ (ไม่ใช่เราต้องคอยไปตรวจสอบค่าแบบ function และ iterator) และมีค่าออกมาได้หลายค่าเหมือนกับ stream ของข้อมูล (ไม่ใช่ออกมาได้ค่าเดียวแล้วจบแบบ promise)
ตัว RxJS เองนอกจากจะมีเครื่องมือสำหรับ manipulate data stream หลากหลายรูปแบบอย่างทั่วถึงจนทำงานได้เกือบทุกประเภทที่เราต้องการแล้วก็มีเครื่องมือสำหรับแปลงเกือบทุกสิ่งอย่างที่เราใช้กันอยู่ให้ออกมาเป็น observable ได้ทำให้เราสามารถใช้ RxJS ได้โดยที่ไม่ต้องห่วงว่าจะเอาไปทำงานกับอะไร และที่เขียนว่า “เกือบทุกสิ่งอย่าง” ก็แค่ไม่แน่ใจว่ามีอะไรแปลงไม่ได้ไหม แต่ที่ใช้งานมาคือยังไม่เคยเจอสิ่งที่ต้องการใช้แล้วแปลงไม่ได้ แค่จะทำได้ด้วยท่าที่สวยงามหรือดีแค่ไหนเท่านั้นเอง

และอย่ากังวลที่จะแทรก RxJS เข้าไปในโค้ดเดิมๆ เพราะมีเครื่องมือให้เปลี่ยน observable ให้เหมาะสมกับการใช้งานในรูปแบบดั้งเดิมได้

มาที่ฝั่ง Observer หรือผู้สังเกตการณ์บ้าง ตัวนี้เป็นการพูดถึงสิ่งที่เราใช้เพื่อไปจับตามอง observable นั่นแหละ พอ observable มีการเปลี่ยนแปลง observer ก็จะรู้ตัวและไปทำอะไรๆ ต่อตามที่เราได้บอกไว้เพื่อให้โปรแกรมทำงานได้ตามที่เราต้องการนี่แหละครับ โดยคำว่าไปจับตามองเนี่ยเป็นแค่ชื่อนะ เบื้องหลังจริงๆ มันเป็นแบบ push ครับ ไม่ได้กิน resource ไป monitor การเปลี่ยนแปลงจริงๆ ไม่ต้องกังวลไป

Pipe and operator

มี observable & observer แล้วก็ยังคงไม่มีอะไรถ้าเราทำได้แค่นำค่าออกมาอย่างเดียว เนื่องจากเราเปรียบ observable เป็น data stream หากเรานำ data เหล่านั้นไปดำเนินการในรูปแบบเดิมๆ ก็จะเสียของไปหน่อย ReactiveX จึงได้เสนอการทำงานด้วยการ manipulate data stream นั้นด้วยการต่อท่อ (pipe) เข้าไปยังการดำเนินการ (operator) ที่เรียงเป็นลำดับไว้เสมือนเป็นวงจร ทำให้ data ที่ไหลออกมาจากแต่ละ operator นั้นจะสัมพันธ์กับ data ที่ไหลเข้าไปเสมอโดยที่เราไม่จำเป็นต้องคอยดึงมาตรวจสอบ (หากไม่มีบั๊คน่ะนะครับ 😅)

ข้อควรรู้อย่างนึงคือ operator นั้นจะ return data stream ออกมาเสมอ นั่นคือตัวมันเองก็จะกลายเป็น source ให้เราต่อท่อนำ data stream ไปใช้งานได้เช่นกันและแต่ละ source เองก็สามารถต่อท่อออกมาได้หลายท่อทำให้เราสามารถนำ data stream ไปใช้งานพร้อมกันหลายที่ได้อย่างหลากหลายครับ

JavaScript synchronous & asynchronous

เวลาถามว่าเวลาหัดเขียน JavaScript หรือ TypeScript แล้วอะไรยากที่สุด ผมเคยเห็นหลายคน (รวมผมด้วย 😬) ตอบว่าเรื่อง asynchronous นี่แหละครับ ซึ่งเป็นการพูดถึงแบบรวมๆ ของ asynchronous ทั้งหมดทุกรูปแบบไม่ว่าจะเป็น callback, promise, async/await ก็ตาม

Reactive with synchronous & asynchronous

อีกเรื่องนึงที่ผมว่าทำให้ RxJS ทรงพลังคือ RxJS ไม่แคร์ว่าคุณจะเจอกับ synchronous หรือ asynchronous เนื่องจาก data stream มันก็ไหลไปตาม pipe ที่เราวางไว้นี่แหละ ไม่ว่าจะได้ผลทันทีไปทำอย่างอื่นก่อนไม่ได้อย่าง synchronous หรือจะแอบไปทำอย่างอื่นเพราะต้องรอผลนานแสนนานอย่าง asynchronous ตัว data เราจะ flow ไปตามลำดับในวงจรที่เราวางไว้อยู่แล้ว และในแต่ละผังวงจรเองก็จะไม่มีการทำงานข้ามลำดับกันเด็ดขาด แม้ว่าอาจจะทำงานสลับลำดับกันในคนละวงจร แต่ output ของทั้งสองวงจรก็ไม่ได้เกี่ยวข้องกันทำให้ไม่มีผลกระทบในทางปฏิบัติเท่าไหร่นัก

Pipeable programming

ใน Reactive programming นี้ source ที่ให้กำเนิด data ตาม event เป็นสิ่งที่สำคัญมาก วงจรจะถูก drive ด้วย data stream เกือบทั้งหมด

แนวคิดแบบดั้งเดิมเราอาจจะเป็นเมื่อมี event เมื่อ user กรอกข้อมูลในช่องนี้เราจะนำเวลาที่เกิด event ไปเทียบกับ event ก่อนหน้า ถ้าน้อยกว่า 300 milliseconds แล้วจะ return ไม่ทำอะไรต่อ ถ้ามากกว่าให้ดึงค่ามาเก็บใส่ตัวแปรนึงแล้วไปทดลองแปลงเป็นตัวเลข ถ้ามี error ให้ return ถ้าไม่มีให้นำตัวเลขที่ได้ไปบวกกับอีกค่าแล้วเอาไปเก็บในอีกตัวแปรนึงแล้วก็สั่งให้หน้าจออัปเดตผลลัพธ์เป็นค่าจากตัวแปรนี้
แต่ถ้าเป็น RxJS เราจะมองช่องกรอกข้อมูลเป็น source นึง ต่อท่อไปผ่าน operator ที่รอกรองค่าที่เทียบเวลากับค่าก่อนหน้าออก แล้วต่อท่อไปผ่าน operator ที่แปลงค่าเป็นตัวเลข แล้วต่อไปผ่านตัวกรอง error ออก แล้วต่อไปผ่านตัวที่เอาค่าไปบวกกับอีกค่า แล้วต่อท่อจากตรงนี้ไปแสดงผลอีกทีนึง

ดูๆ ไปก็ไม่ได้ต่างอะไรกันเท่าไหร่ใช่ไหมครับ? ก็ถูกแหละเพราะสุดท้ายแล้วการทำงานที่เกิดขึ้นทั้งหมดมันก็ไม่ได้ต่างกันมากหรอกถ้าคุณมี input และ output ที่เหมือนกัน แต่ถ้าเขียนออกมาเป็นโค้ดจริงๆ จะพบว่านอกจากวิธีคิดแล้วภาพด้านบนที่ผมยกมาเรื่อง Functional, less is more และ async error handling เนี่ยจริงมาก โค้ดง่ายขึ้น เกิดช่องโหว่ให้ผิดพลาดได้น้อยลง

และส่วนที่ต่างจริงๆ ไม่ใช่การทำงานแต่เป็นแนวคิดที่เรายึด data flow เป็นหลัก คิดว่า data ตัวนึงจะมี flow จากไหนไปไหนอย่างไรบ้าง เขียนเส้นทางของ data ว่าเป็นยังไงไม่ใช่เมื่อเกิด event แล้วเราจะต้องทำอะไรบ้างแล้วเขียนให้จับ data มายำไปทีละตัว

ผมเรียกรูปแบบที่ใช้งานอยู่ว่าเป็น pipeable programming เพราะตอนผมออกแบบว่าจะเขียนอะไรผมจะวางวงจรการทำงานเป็น source, operator, และปลายทางแล้วทำการ pipe เพื่อเชื่อมทั้งสามส่วนเข้าด้วยกันเป็นวงจรที่พร้อมใช้งานอีกทีนึง ตัวอย่างเช่น Too complicated calculator ตัวนี้ที่ผมจะพาดูจากภาพโครงสร้างตอนคิดตามด้วยการทำงานของมันแล้วจะมีโค้ดที่ทำงานได้จริงให้ตอนท้ายนะครับ

Too complicated calculator circuit diagram
Too complicated calculator circuit diagram

ตัวเครื่องคิดเลขนี้มี input 4 ช่อง แบ่งเป็น text input 3 ช่องและ checkbox 1 ช่อง แล้วมีช่อง output อีกช่องนึง มองข้ามๆ เรื่องบั๊คที่อาจเกิดไปนะครับผมแค่จะนำเสนอตัววงจรการทำงาน 😂 ตัวนี้จะมองว่าเป็นวงจรแบบวงจรไฟฟ้าก็ได้ จะมองว่าเป็น flow chart ก็ตามสะดวก แต่ตอนที่โค้ดจริงๆ ก็จะเขียนกล่อง operator แต่ละกล่องแล้วสั่ง pipe ต่อท่อไปเรื่อยๆ จนออกมาเหมือนในรูปเลยครับ

เริ่มไล่การทำงานจากตรงที่ผมเขียนว่า source แล้วกันครับ ตรงนั้นคือใช้ RxJS จับ input ทั้ง 4 ช่องมาเป็น observable ไว้แล้ว จากนั้นเรานำ input1, 2 ที่เราปล่อยให้เป็น free text โดยป้อนเป็นอักขระใดๆ ก็ได้เนี่ยไปผ่าน operator isNumber (ที่เขียนโดยใช้ operator filter อีกที) มากรองค่าที่ไม่ใช่ตัวเลขออกไปก่อน input1, 2 ก็จะกลายเป็น number1, 2 ไปเนื่องจากด้านหลัง operator isNumber นี้นั้นจะมีเฉพาะค่าที่เป็นตัวเลขเท่านั้นถูกพ่นออกมา ค่าใดๆ ที่ไม่ผ่านเงื่อนไขก็จะหายไปด้านหลัง operator นี้ครับ ไม่ต้องไปสนใจมัน 😬

จากนั้นเรารวม input ทั้ง 4 ตัวด้วย combineLatest ซึ่งจะทำให้ค่าที่พ่นออกมาหลังจากนี้หนึ่งค่าจะเป็น array ของค่าทั้ง 4 ในครั้งเดียว และจะพ่นใหม่ทุกครั้งที่มีการเปลี่ยนแปลงใดๆ กับค่าใดค่าหนึ่ง

ต่อกันไปที่ isOnline ครับ ตัวนี้เราเขียนด้วย partition ตัวนี้จะคล้ายกับ isNumber ที่เขียนด้วย filter ซึ่ง filter นั้นจะกรองค่าที่ไม่ผ่านเงื่อนไขทิ้งไปเลย แต่ partition นี้จะมีช่องทางออกสองทาง (ตัวมันได้ observable ออกมาสองตัว) คือค่าที่ผ่านเงื่อนไขก็จะออกมาในทาง isOnline ส่วนค่าที่ไม่ผ่านเงื่อนไขก็จะวิ่งออกทาง not isOnline ครับ ตัวนี้เราจะแบ่งแยกการทำงานออกเป็นสองสายคือสายที่คำนวณออฟไลน์และออนไลน์

ฝั่งออฟไลน์

ไม่มีอะไรครับ ก็เอาค่า number1, 2 และ operator ไปคำนวณแบบ synchronous ธรรมดาแล้วก็พ่นออกมา

ฝั่งออนไลน์

ตัวนี้ซับซ้อนกว่าฝั่งออฟไลน์หน่อย กล่องเยอะครับ เริ่มจากกล่อง DDoS protector ที่เราก็แค่ใช้ debounceTime มากรอง ทำให้เวลา user เปลี่ยนแปลงค่า input รัวๆ เนี่ยเราจะรอจนค่านิ่งแล้วไม่เปลี่ยนแปลงสัก 0.5 วินาทีแล้วค่อยส่งค่าต่อไป จะได้ไม่หนัก server เรา

ออกจาก DDoS protector มาเราก็ส่งค่าไปคำนวณจริงๆ กับ server ครับ แต่เนื่องจากการส่งค่าไปเนี่ยกินเวลานาน API ที่ใช้เลยเป็น asynchronous ทำให้เราเลือกใช้ switchMap เพื่อเปลี่ยนค่าไปเป็นค่าที่เรายิง API ไปคำนวณและรับผลกลับมาพ่นต่อไปครับ
แม้จะต่างจากฝั่ง offline ที่ใช้ map ธรรมดาในการคำนวณแบบ synchronous แต่ทั้งสองทางก็จะจบที่พ่นค่าออกมาเมื่อคำนวณเสร็จเช่นกัน

RxJS ทำให้เราไม่ต้องกังวลว่าการทำงานเป็น asynchronous หรือ synchronous ครับ ไม่จำเป็นต้องกังวลกับลำดับการทำงานของโค้ดเพราะสุดท้ายแล้ว data ก็จะวิ่งไปตามเส้นทางตามลำดับที่เราเขียน

เนื่องจากมันเป็นการคำนวณโดยต้องพึ่งพาภายนอกจึงอาจมีความผิดพลาดเกิดขึ้นได้ เราจึงมี operator อีกตัวนึงที่ใช้ retryWhen มารวมกับ delay ครับ เพื่อที่เมื่อค่าที่ได้รับกลับมาจาก server นั้นไม่ใช่ผลลัพธ์แต่เป็น error ต่างๆ ทั้งติดต่อ server ไม่ได้หรืออะไรบางอย่างเนี่ยโปรแกรมเราจะไม่น็อคแต่จะพยายามส่งค่าไปให้ server คำนวณใหม่เรื่อยๆ โดยเว้นระยะเวลาตามที่เราใส่ delay ไว้

ผลลัพธ์

แม้เราจะได้ผลลัพธ์ออกมาจากทั้งสองทางคือออนไลน์และออฟไลน์ที่ทำงานต่างกันเป็นแบบ asynchronous และ synchronous แต่นั่นก็ไม่ใช่ปัญหาครับอย่างที่ผมบอกแล้วว่า RxJS ไม่แคร์ว่าเราทำงานกับ synchronous หรือ asynchronous เพราะเราใช้ operator merge มารวมค่าทั้งสองสายเข้าด้วยกันเป็นท่อเดียว ค่าที่พ่นออกมาก็จะเป็นค่าที่ได้รับหลังสุดจากทั้งสองสายแล้วเราก็นำไปเป็น source ให้ช่องผลลัพธ์ก็เป็นอันเรียบร้อยครับ

Too complicated calculator code -> https://codepen.io/invition/pen/gOPJdzP

Too complicated calculator on CodePen

เมื่อ source และ output เริ่มเยอะขึ้นการทำงานก็จะดูเรียบง่ายขึ้นไปอีกเมื่อเทียบกับการเขียนโค้ดแบบเดิมๆ เนื่องจาก data แต่ละตัวก็มีเส้นทางของตัวเองชัดเจนอยู่โดยไม่ต้องสนใจลำดับในโค้ดทั้งการทำงานแต่ละส่วนก่อนหรือหลัง หรือตรงไหนจะเสร็จก่อน/หลังของ asynchronous เราแค่วาง data stream ให้ถูกก็พอแล้ว

circuit diagram เมื่อมี source, output มากขึ้น
ตัวอย่าง circuit diagram ของวงจรที่มีหลาย input และ output ที่ data stream มีความเกี่ยวข้องกัน

--

--

Ittipol Thirasat

เจเนอรัลเบ๊ผู้เชื่อว่าโลกขับเคลื่อนด้วยความขี้เกียจของมนุษยชาติ