An attempt at making a modern feed reader.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

197 lines
5.0 KiB

  1. use std::env;
  2. use std::error::Error;
  3. use std::ops::Deref;
  4. use std::sync::mpsc;
  5. use std::thread;
  6. use chrono::prelude::*;
  7. use chrono::{Duration, Utc};
  8. use diesel::pg::PgConnection;
  9. use diesel::prelude::*;
  10. use dotenv::dotenv;
  11. use feed_rs::{model as feed_model, parser as feed_parser};
  12. use crate::models::feed::{
  13. get_all_feeds, get_undownloaded_feeds, update_data, update_last_updated, Feed,
  14. };
  15. pub mod timer;
  16. #[derive(PartialEq, Debug)]
  17. pub enum DownloaderInstruction {
  18. StartDownload,
  19. ForceDownloadAll,
  20. DownloadUndownloaded,
  21. }
  22. pub struct DownloaderInstructor(pub mpsc::Sender<DownloaderInstruction>);
  23. impl Deref for DownloaderInstructor {
  24. type Target = mpsc::Sender<DownloaderInstruction>;
  25. fn deref(&self) -> &mpsc::Sender<DownloaderInstruction> {
  26. &self.0
  27. }
  28. }
  29. pub fn start(receiver: mpsc::Receiver<DownloaderInstruction>) {
  30. dotenv().ok();
  31. let database_url = env::var("DATABASE_URL").unwrap();
  32. let conn = PgConnection::establish(&database_url).unwrap();
  33. loop {
  34. println!("[DL] Awaiting downloader instruction.");
  35. let instruction = match receiver.recv() {
  36. Ok(value) => value,
  37. Err(err) => {
  38. println!("[DL] An error occurred receiving the downloader instruction, exiting: {}", err);
  39. break;
  40. }
  41. };
  42. println!("[DL] Received instruction {:?}, starting.", instruction);
  43. let force_download = instruction == DownloaderInstruction::ForceDownloadAll;
  44. let undownloaded_only =
  45. instruction == DownloaderInstruction::DownloadUndownloaded;
  46. if let Err(err) = download_and_save_feeds(
  47. &conn,
  48. force_download,
  49. undownloaded_only,
  50. Some(&receiver),
  51. ) {
  52. println!("[DL] An error occurred downloading all feeds: {}", err);
  53. }
  54. }
  55. }
  56. pub fn download_feed_data(url: &str) -> String {
  57. match reqwest::blocking::get(url) {
  58. Ok(value) => value.text().unwrap_or_default(),
  59. Err(err) => {
  60. println!("[DL] Error downloading feed: {}", err);
  61. String::new()
  62. }
  63. }
  64. }
  65. pub fn download_and_save_feeds(
  66. conn: &PgConnection,
  67. force_download: bool,
  68. undownloaded_only: bool,
  69. receiver: Option<&mpsc::Receiver<DownloaderInstruction>>,
  70. ) -> Result<(), Box<dyn Error>> {
  71. let feeds = {
  72. if undownloaded_only {
  73. get_undownloaded_feeds(&conn)
  74. } else {
  75. get_all_feeds(&conn, false, None)
  76. }
  77. }
  78. .unwrap_or_default();
  79. if feeds.is_empty() {
  80. println!("[DL] No feeds found, doing nothing.");
  81. return Ok(());
  82. }
  83. let now = Utc::now();
  84. println!("[DL] {} feeds found.", feeds.len());
  85. for mut feed in feeds {
  86. if !force_download
  87. && feed.last_downloaded
  88. > now.timestamp_millis() - Duration::minutes(30).num_milliseconds()
  89. {
  90. println!("[DL] Skipping {} as it was downloaded recently.", feed.id);
  91. continue;
  92. }
  93. if let Some(receiver) = receiver {
  94. let instruction = match receiver.try_recv() {
  95. Ok(value) => {
  96. println!("[DL] Received instruction {:?} while downloading.", value,);
  97. Some(value)
  98. }
  99. Err(err) => {
  100. if err != mpsc::TryRecvError::Empty {
  101. println!("[DL] Error attempting to receive from channel: {}", err);
  102. break;
  103. }
  104. None
  105. }
  106. };
  107. if let Some(instruction) = instruction {
  108. // If we receive the force download all instruction and we're not already force downloading
  109. // run this same function with force download all enabled.
  110. if instruction == DownloaderInstruction::ForceDownloadAll
  111. && !force_download
  112. {
  113. println!(
  114. "[DL] Restarting the downloader with force download enabled."
  115. );
  116. download_and_save_feeds(&conn, true, false, Some(&receiver)).unwrap();
  117. break;
  118. } else {
  119. // Otherwise just print that we're ignoring the instruction.
  120. println!("[DL] Ignoring the {:?} instruction.", instruction);
  121. }
  122. }
  123. }
  124. let data = download_feed_data(feed.url.as_str());
  125. if data == "" {
  126. continue;
  127. }
  128. update_data(&conn, feed.id, data.clone())?;
  129. feed.data = Some(data.clone());
  130. calculate_and_save_last_updated(&conn, &feed);
  131. println!("[DL] Downloaded feed {}, sleeping for 1 second.", feed.id);
  132. thread::sleep(Duration::seconds(1).to_std()?);
  133. }
  134. Ok(())
  135. }
  136. pub fn calculate_last_updated(
  137. channel: &feed_model::Feed,
  138. ) -> Option<DateTime<Utc>> {
  139. let mut date = None;
  140. if let Some(entry) = channel.entries.first() {
  141. date = entry.published;
  142. } else if let Some(updated) = channel.updated {
  143. date = Some(updated);
  144. }
  145. date
  146. }
  147. pub fn calculate_and_save_last_updated(
  148. conn: &PgConnection,
  149. feed: &Feed,
  150. ) -> Option<DateTime<Utc>> {
  151. let feed_data = feed.data.clone();
  152. if let Ok(channel) =
  153. feed_parser::parse(feed_data.unwrap_or_default().as_bytes())
  154. {
  155. let date_to_save = calculate_last_updated(&channel);
  156. if let Some(date) = date_to_save {
  157. if let Err(err) =
  158. update_last_updated(&conn, feed.id, date.timestamp_millis())
  159. {
  160. println!("{}", err);
  161. }
  162. }
  163. return date_to_save;
  164. }
  165. None
  166. }